/*
 * Dijjer - A Peer to Peer HTTP Cache
 * Copyright (C) 2004,2005 Change.Tv, Inc
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
package dijjer.io.download;

import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;

import dijjer.AbstractFactory;
import dijjer.Dijjer;
import dijjer.io.BlockInfo;
import dijjer.io.comm.AbstractPeer;
import dijjer.io.comm.DMT;
import dijjer.io.comm.Dispatcher;
import dijjer.io.store.Store;
import dijjer.io.xfer.OutputStreamBlockTransmitter;
import dijjer.io.xfer.PartiallyReceivedBlock;
import dijjer.io.xfer.http.HttpBlockReceiver;
import dijjer.util.Misc;
import dijjer.util.TruncatingOutputStream;
import dijjer.util.VeryLongInteger;
import dijjer.util.logging.Logger;

/**
 * @author ian
 * 
 * To change the template for this generated type comment go to Window - Preferences - Java - Code Generation - Code and
 * Comments
 */
public class Download implements Job {

	boolean _aborted = false;
	OutputStream _os;
	public static final int MAX_PENDING = 8;
	int _firstBlock, _lastBlock, _nextBlockToDownload, _nextHashToDownload, _fromCacheCount = 0;
	LinkedList _pending = new LinkedList();
	HashMap _retrievedHashes = new HashMap();
	URL _url;
	long _length;
	String _lastModified;

	public Download(URL url, long length, String lastModified, int firstBlock, int lastBlock, OutputStream os) {
		_url = url;
		_length = length;
		_lastModified = lastModified;
		_firstBlock = firstBlock;
		_lastBlock = lastBlock;
		_nextBlockToDownload = _nextHashToDownload = _firstBlock;
		_os = os;
	}

	public void start() {
		DownloaderThread.addJob(this);
		abort : for (int block = _firstBlock; block <= _lastBlock; block++) {
			if (_aborted) {
				break abort;
			}
			while (_pending.isEmpty()) {
				if (_aborted) {
					break abort;
				}
				synchronized (this) {
					try {
						this.wait();
					} catch (InterruptedException e) {
					}
				}
			}
			BlockPair bp;
			synchronized (this) {
				Logger.info("Removing block " + block + " from pending queue");
				bp = (BlockPair) _pending.removeFirst();
			}
			OutputStreamBlockTransmitter osbt = new OutputStreamBlockTransmitter(bp.getPartiallyReceivedBlock(), _os);
			try {
				Logger.info("Starting transfer of block " + block);
				osbt.start();
				Logger.info("Completed transfer of block " + block);
				// Confirm that the hash matches
				VeryLongInteger retrievedHash = (VeryLongInteger) _retrievedHashes.get(new Integer(block));
				if (retrievedHash != null) {
					Logger.info("Verifying hash for block "+block);
					VeryLongInteger actualHash = new VeryLongInteger(bp.getPartiallyReceivedBlock().getBlock());
					if (!actualHash.equals(retrievedHash)) {
						Logger.warning("Hash check for block "+block+" failed");
						handleHashVerificationFailure(retrievedHash, actualHash, bp.getBlockInfo());
					}
				}
			} catch (Exception e) {
				Logger.error("Exception writing data to OutputStream, aborting download", e);
				this.abort();
			}
		}
		DownloaderThread.removeJob(this);
	}

	public boolean readyForNextTask() {
		return (!_aborted) && _pending.size() <= MAX_PENDING
				&& (_nextBlockToDownload <= _lastBlock || _nextHashToDownload <= _lastBlock);
	}

	public void abort() {
		_aborted = true;
		synchronized (this) {
			this.notify();
		}
	}

	public long getBytesFromCache() {
		return _fromCacheCount * Store.DATA_BLOCK_SIZE;
	}

	public void performNextTask() {
		if (_aborted) {
			return;
		}
		int blockNo;
		if (_nextHashToDownload < _nextBlockToDownload) {
			synchronized (this) {
				blockNo = _nextHashToDownload++;
			}
			Logger.info("Prepairing to retrieve hash " + blockNo);
			int uid = Misc.nextInt();
			Dispatcher.getDispatcher().registerUid(uid);
			VeryLongInteger checkHash = null;
			try {
				checkHash = Dispatcher.getDispatcher().retrieveHash(
						new BlockInfo(_url, _length, _lastModified, blockNo), 10, uid, false);
			} catch (Exception e) {
				Logger.warning("Failed to retrieve hash " + e.getMessage());
			}
			Dispatcher.getDispatcher().unregisterUid(uid);
			if (checkHash != null) {
				_retrievedHashes.put(new Integer(blockNo), checkHash);
			}
			Logger.info("Done with retrieval of hash " + blockNo);
		} else {
			PartiallyReceivedBlock block;
			BlockInfo bi;
			synchronized (this) {
				blockNo = _nextBlockToDownload++;
				bi = new BlockInfo(_url, _length, _lastModified, blockNo);
				block = new PartiallyReceivedBlock(Dijjer.PACKETS_IN_BLOCK, Dijjer.PACKET_SIZE);
				_pending.addLast(new BlockPair(bi, block));
				this.notify();
			}
			try {
				// If the remote transfer fails we don't want the transfer to the client to be aborted, rather
				// we want to rerequest with TTL of 0, so we tell the PartiallyRetrievedBlock to ignore any
				// abort
				block.setIgnoreAbort(true);
				if (Dispatcher.getDispatcher().retrieveData(bi, 15, block, this)) {
					_fromCacheCount++;
				}
				block.setIgnoreAbort(false);
			} catch (Exception e) {
				Logger.warning("Error during download of block " + blockNo + ": " + e.getMessage());
				try {
					Logger.info("Retrying block " + blockNo + " with TTL of 0");
					Dispatcher.getDispatcher().retrieveData(new BlockInfo(_url, _length, _lastModified, blockNo), 0,
							block, this);
				} catch (Exception e1) {
					Logger.error("Error during direct download of block " + blockNo + ", aborting", e1);
					this.abort();
				}
			}
			Logger.info("Done with retrieval of block " + blockNo);
		}
	}

	public String getFileName() {
		return _url.getFile();
	}

	public long getLength() {
		return ((TruncatingOutputStream) _os).getLength();
	}

	public long getSent() {
		return ((TruncatingOutputStream) _os).getBytesTransmitted();
	}

	public long getDownloaded() {
		long ttl = getSent();
		synchronized (this) {
			for (Iterator i = _pending.iterator(); i.hasNext();) {
				BlockPair bp = (BlockPair) i.next();
				ttl += bp.getPartiallyReceivedBlock().numReceived() * Dijjer.PACKET_SIZE;
			}
		}
		return ttl;
	}

	public long getTotalFileLength() {
		return _length;
	}

	private void handleHashVerificationFailure(VeryLongInteger retrievedHash, VeryLongInteger actualHash, BlockInfo bi) {
		// Find out what the hash should be
		PartiallyReceivedBlock prb = new PartiallyReceivedBlock(Dijjer.PACKETS_IN_BLOCK, Dijjer.PACKET_SIZE);
		try {
			(new HttpBlockReceiver(bi, prb)).start();
		} catch (Exception e) {
			Logger.warning("Exception while trying to deal with hash verification failure", e);
		}
		VeryLongInteger confirmHash = new VeryLongInteger(prb.getBlock());
		boolean dataHashFailure = !confirmHash.equals(actualHash);
		boolean hashHashFailure = !confirmHash.equals(retrievedHash);
		Logger.info("Sending corruptionNotification for " + bi + " (" + dataHashFailure + ", " + hashHashFailure + ")");
		for (Iterator i = ((ArrayList) AbstractFactory.getFactory().getRoutingTable().getPeers().clone()).iterator(); i.hasNext();) {
			AbstractPeer p = (AbstractPeer) i.next();
			if (dataHashFailure) {
				AbstractFactory.getFactory().getUdpSocketManager().send(p,
						DMT.createCorruptionNotification(Misc.nextInt(), bi, false));
			}
			if (hashHashFailure) {
				AbstractFactory.getFactory().getUdpSocketManager().send(p,
						DMT.createCorruptionNotification(Misc.nextInt(), bi, true));
			}
		}
	}

	protected static class BlockPair {

		BlockInfo _bi;
		PartiallyReceivedBlock _prb;

		public BlockPair(BlockInfo bi, PartiallyReceivedBlock prb) {
			_bi = bi;
			_prb = prb;
		}

		public BlockInfo getBlockInfo() {
			return _bi;
		}

		public PartiallyReceivedBlock getPartiallyReceivedBlock() {
			return _prb;
		}
	}
}
